(零基础可以看懂)强化学习中的蒙特卡罗应用(贝尔曼方程)(含代码) 您所在的位置:网站首页 《悬案》系列专栏第023篇 白屋血案在线阅读 (零基础可以看懂)强化学习中的蒙特卡罗应用(贝尔曼方程)(含代码)

(零基础可以看懂)强化学习中的蒙特卡罗应用(贝尔曼方程)(含代码)

2023-10-14 05:13| 来源: 网络整理| 查看: 265

(零基础可以看懂)强化学习中的蒙特卡洛应用(贝尔曼方程)(含代码)-《强化学习系列专栏第2篇》 介绍蒙特卡罗方法的介绍First-Visit Monte-Carlo Policy Evaluation(首次访问型蒙特卡罗策略估计)Every-Visit Monte-Carlo Policy Evaluation(每次访问型蒙特卡罗策略估计) 代码复现、详细讲解及我的Github地址

介绍

    蒙特卡罗方法,简单一句话来理解,就是基于大数定律,使用采样的方式来估算分布。应用在强化学习中,更多的是将复杂环境的模型给“替换”掉。这个怎么理解呢?《(零基础可以看懂)强化学习中的动态规划(贝尔曼方程)(含代码)-》这篇文章中,我说明了如何使用迭代法去近似最优解,但使用这个算法的前提是我们需要知道环境的信息,比如我上述文章的例子中,我向左走,会走到哪个状态,或者是按照什么样的概率获得多少的回报等等这些信息。但是实际情况下,状态可能有上千万种,比如围棋一共有361个个字,每个格子都有两种情况(黑棋或白棋),那粗略的计算一下,也将近有 2 361 2^{361} 2361种状态,想象一下使用上述文章中的迭代法来近似最优解,其计算复杂度有多高,普通PC机根本求解不出来。因此,也就诞生了使用蒙特卡洛方法去近似贝尔曼方程的最优解。而蒙特卡罗方法求解的时候只需要无脑的采样(也就是让智能体在环境中执行动作,获得奖励,执行动作,获得奖励…………结束),然后根据采样到的结果,取平均,就能得到一个估计值,并且这个估计值是无偏估计(也就是估计值与真实值的方差随着采样数量的增加,而趋于0)。     这里说一句,需要使用环境的模型去求解最优解的算法称为Model-Based方法。不需要知道环境的信息就可以求解最优解的算法称为Model-Free方法。

蒙特卡罗方法的介绍

    使用蒙特卡罗方法的时候,采样的数据是以一幕一幕(一个episode被称为一幕)划分的。什么叫一幕呢?比如玩一把游戏,玩一次迷宫,完好之后,我们可以统计出这样的序列出来,这样的序列称为一幕。 s 1 , a 1 , r 2 , s 2 , a 2 , r 3 , . . . , . . . , s t − 1 , a t − 1 , r t , s t , a t , r t + 1 , t e r m i n a l s_1,a_1,r_2,s_2,a_2,r_3,...,...,s_{t-1},a_{t-1},r_t,s_t,a_t,r_{t+1},terminal s1​,a1​,r2​,s2​,a2​,r3​,...,...,st−1​,at−1​,rt​,st​,at​,rt+1​,terminal

    蒙特卡罗方法分为First-Visit Monte-Carlo Policy Evaluation(首次访问型蒙特卡罗策略估计)和Every-Visit Monte-Carlo Policy Evaluation(每次访问型蒙特卡罗策略估计)。它们的区别在于在采样的时候,在计算样本数量的时候,每次访问型蒙特卡罗策略估计的方法在一幕中,出现几次的要统计的量,那就统计几次。而首次访问型蒙特卡罗策略估计在一幕中只会统计一次要统计的量,尽管出现了多次。

First-Visit Monte-Carlo Policy Evaluation(首次访问型蒙特卡罗策略估计)

    举个例子来说,假如我们现在要用“首次访问型蒙特卡罗策略估计”去估算动作状态 q ( s 0 , a 0 ) q(s_0,a_0) q(s0​,a0​)的价值。一共采样了 N N N幕,其中共有 N ( s ) N(s) N(s)幕出现了动作状态 q ( s 0 , a 0 ) q(s_0,a_0) q(s0​,a0​),那么这个时候,按以下方法去计算动作状态 q ( s 0 , a 0 ) q(s_0,a_0) q(s0​,a0​)的价值。 (1)记录每一幕中,第一次出现动作状态 q ( s 0 , a 0 ) q(s_0,a_0) q(s0​,a0​)开始,直到该幕结束的reward。然后将每一幕reward相加求和,记为 S ( s ) S(s) S(s)。 (2)计算总共有多少幕中,出现了动作状态 q ( s 0 , a 0 ) q(s_0,a_0) q(s0​,a0​),该数量记为 N ( s ) N(s) N(s) 根据公式: q ( s 0 , a 0 ) = S ( s ) N ( s ) q(s_0,a_0) = \frac{S(s)}{N(s)} q(s0​,a0​)=N(s)S(s)​ 计算动作状态 q ( s 0 , a 0 ) q(s_0,a_0) q(s0​,a0​)的价值。

(1)以下是我写的代码,代码中有几点要说明下,self.episode变量代表着一共要采样多少幕的数据,这里我设定了10万幕,10万幕基本上已经很准确了。 (2)训练结束后,show_policy(self)方法中,policy_dict存储着策略。举个例子来说,policy_dict[‘22’]就代表着(2代表着第3行,1代表第2行,0代表第1行,依次类推,因为代码里是从0开始的,所以也要相应的转变一下。),当你处在在第3行第3列的格子里的时候,这个时候根据以下四个动作的“状态动作”价值,可以判断出,执行动作“向下走”的价值是最高的,所以这个时候我们选择向下走。细心的人可能已经发现了,“向右走”和“向下走”的价值是很接近的,确实是这样的,由于对称性,理论上来说,这两个动作的价值也是一样的。最终,我们根据这个policy_dict变量,就可以制作出我们的策略了。 在这里插入图片描述

# encoding=utf-8 ''' Author: Haitaifantuan Create Date: 2020-09-08 23:47:11 Author Email: [email protected] Description: Should you have any question, do not hesitate to contact me via E-mail. ''' import numpy as np import random import time class First_Visit_Monte_Carlo_Policy_Evaluation(object): def __init__(self): self.total_rows = 4 self.total_columns = 4 self.total_action_num = 4 # 0代表上,1代表右,2代表下,3代表左 self.reward_each_step = -1 self.action_dict = {0: '上', 1: '右', 2: '下', 3: '左'} self.reversed_action_dict = {'上': 0, '右':1, '下':2, '左': 3} # 分别是走上、下、左、右的概率。随机数命中某个数字如49,那就是向右。随机数只在0-100随机选数字。 self.four_action_probability = {'上': range(0, 25), '右': range(25, 50), '下': range(50, 75), '左': range(75, 100)} self.idx_change_dict = {'上': (-1, 0), '右': (0, 1), '下': (1, 0), '左': (0, -1)} # 左边这个是行的索引的改变,右边这个是列的索引的改变 self.episode = 100000 # 共采集TOTAL_ITERATION幕数据 # 初始化状态价值函数V maze = np.zeros((self.total_rows, self.total_columns)) # 用0代表迷宫的每一格,这个maze只是方便我们看迷宫,没有其他作用。 print(maze) def get_current_reward_and_next_state(self, current_state, action): ''' 根据当前的状态,以及行为,计算当前行为的奖励以及下一个状态 ''' row_idx, column_idx = current_state # 计算下下一步的state和reward next_row_idx = row_idx + self.idx_change_dict[action][0] next_column_idx = column_idx + self.idx_change_dict[action][1] # 先判断是否到了终点,如果是终点,不管执行什么操作 # 奖励都是0,并且都会回到终点 if (next_row_idx == 0 and next_column_idx == 0): return 0, (0, 0) if (next_row_idx == 3 and next_column_idx == 3): return 0, (3, 3) # 再判断是否在边缘,如果是的话,那就回到该位置。 if next_row_idx self.total_rows - 1 or next_column_idx self.total_columns - 1: return self.reward_each_step, (row_idx, column_idx) else: return self.reward_each_step, (next_row_idx, next_column_idx) def generate_initial_state(self, total_rows, total_columns): row_idx = random.randint(0, total_rows - 1) column_idx = random.randint(0, total_columns - 1) while (row_idx == 0 and column_idx == 0) or (row_idx == 3 and column_idx == 3): row_idx = random.randint(0, total_rows - 1) column_idx = random.randint(0, total_columns - 1) return (row_idx, column_idx) def generate_one_episode_data(self, init_state): one_episode_data = [] current_state = init_state while not ((current_state[0] == 0 and current_state[1] == 0) or (current_state[0] == 3 and current_state[1] == 3)): # 根据概率产生一个动作 rand_int = random.randint(0, 99) for each in self.four_action_probability.items(): if rand_int in each[1]: action = each[0] break # 根据要走的动作得到奖励以及获取下一个状态 reward, next_state = self.get_current_reward_and_next_state(current_state, action) # (当前状态,当前行为,当前行为的奖励) one_episode_data.append((current_state, self.reversed_action_dict[action], reward)) current_state = next_state # while循环出来的时候,最后一个terminal状态没加进去。 one_episode_data.append((current_state, None, None)) return one_episode_data def fire_calculation(self): # 计算“状态-动作价值” # 创建一个字典保存出现的状态-动作以及奖励 begin_time = time.time() episode_record_dict = {} final_state_action_reward_dict = {} # 这个和state_action_reward_dict是有区别的,它是记录总体的。 final_state_action_count_dict = {} for episode in range(self.episode): # 生成每一幕的数据 all_episode_list = [] # 随机生成一个起始状态 init_state = self.generate_initial_state(self.total_rows, self.total_columns) # 生成一幕数据 episode_record_dict[episode] = self.generate_one_episode_data(init_state) # 对这幕数据进行遍历,然后将出现过的状态进行统计 has_been_counted = {} # 记录状态是否在该幕出现过 state_action_reward_dict = {} # 记录每一个状态当前的总共的reward for idx, eachTuple in enumerate(episode_record_dict[episode]): # 先判断是不是到了终点,如果是的话就跳出循环 if idx == len(episode_record_dict[episode])-1: break # 将state和action组合成字符串,方便作为dict的key state_action_combination = str(eachTuple[0][0]) + str(eachTuple[0][1]) + str(eachTuple[1]) # 对state_action_reward_dict()里的所有的key都累加当前的reward。 for key in state_action_reward_dict.keys(): state_action_reward_dict[key] += eachTuple[2] # 检测当前这一幕该状态和动作组合是否出现过 if state_action_combination not in has_been_counted.keys(): # 如果不存在在state_count_dict.keys()里,说明是第一次碰到该状态。 has_been_counted[state_action_combination] = 1 # 随便赋值一个value state_action_reward_dict[state_action_combination] = eachTuple[2] # 将该募最后统计到总的变量里。 for state_action, reward in state_action_reward_dict.items(): if state_action not in final_state_action_reward_dict.keys(): final_state_action_reward_dict[state_action] = reward # 将该状态-动作计数设为reward final_state_action_count_dict[state_action] = 1 # 将该状态-动作计数设为1 else: # 否则说明其他幕中出现过该状态-动作,并且曾经统计到final_state_action_reward_dict和final_state_action_count_dict变量里面 # 直接累加就好了。 final_state_action_reward_dict[state_action] += reward final_state_action_count_dict[state_action] += 1 if episode % 100 == 0: print("第{}个episode已完成=====已花费{}分钟".format(episode, (time.time() - begin_time) / 60)) # 计算下最终的状态-动作价值 # 由于是按概率采样,因此可能会导致某些动作-状态没有出现过,这个时候就需要一些方法去解决了。 # 一种方法是增加采样次数,这种方法相当于是暴力解决。 # 另一种方法可以参考sutton的《强化学习第二版》的98页的5.4内容 self.averaged_state_action_value_dict = {} for state_action, reward in final_state_action_reward_dict.items(): self.averaged_state_action_value_dict[state_action] = reward / final_state_action_count_dict[state_action] # print(self.averaged_state_action_value_dict) def show_policy(self): policy_dict = {} for state_action, value in self.averaged_state_action_value_dict.items(): if state_action[0:2] not in policy_dict.keys(): policy_dict[state_action[0:2]] = {self.action_dict[int(state_action[2])]: value} else: policy_dict[state_action[0:2]][self.action_dict[int(state_action[2])]] = value print(policy_dict) obj = First_Visit_Monte_Carlo_Policy_Evaluation() obj.fire_calculation() obj.show_policy() Every-Visit Monte-Carlo Policy Evaluation(每次访问型蒙特卡罗策略估计)

    举个例子来说,假如我们现在要用“每次访问型蒙特卡罗策略估计”去估算状态 s 0 s_0 s0​的价值。一共采样了 N N N幕,这个时候,按以下方法去计算状态 s 0 s_0 s0​的价值。 (1)记录每一幕中,每次只要出现状态 s 0 s_0 s0​,就开始计算累积的reward,直到该幕结束。 (2)然后将每一次所记录的总的reward相加求和,记为 S ( s ) S(s) S(s)。 (2)统计总共有多少次出现了状态 s 0 s_0 s0​,该数量记为 N ( s ) N(s) N(s) 根据公式: v ( s 0 ) = S ( s ) N ( s ) v(s_0) = \frac{S(s)}{N(s)} v(s0​)=N(s)S(s)​ 计算状态 v ( s 0 ) v(s_0) v(s0​)的价值。

(1)以下是我写的代码,代码中有几点要说明下,self.episode变量代表着一共要采样多少幕的数据,这里我设定了10万幕,10万幕基本上已经很准确了。 (2)训练结束后,我们可以得到这张图,这张图,代表了这样的意思:假如你处在第2行第2列的格子里(在图片中就是第1行第0列,因为行和列都是从0开始的),这个格子的状态价值是-17.11,然后我们环顾四周,发现第2行第1列(图片中是第1行第0列)的格子的状态价值最高,因此我们下一步往这个格子走。细心的人可能发现了,第1行第2列(图片中是第0行第1列)和第2行第1列(图片中是第1行第0列)的价值很接近,确实是这样的,由于对称性,理论上来说,这两个状态的价值应该是一样的,只不过由于采样的原因,导致他们的值略有浮动。就这样,根据这个图,我们就可以制定出我们的策略了。 在这里插入图片描述

# encoding=utf-8 ''' Author: Haitaifantuan Create Date: 2020-09-08 23:47:11 Author Email: [email protected] Description: Should you have any question, do not hesitate to contact me via E-mail. ''' import numpy as np import random import time import matplotlib.pyplot as plt from matplotlib.table import Table # 解决plt显示中文的问题 plt.rcParams['font.sans-serif']=['SimHei'] plt.rcParams['axes.unicode_minus'] = False class Every_Visit_Monte_Carlo_Policy_Evaluation(object): def __init__(self): self.total_rows = 4 self.total_columns = 4 self.total_action_num = 4 # 0代表上,1代表右,2代表下,3代表左 self.reward_each_step = -1 self.action_dict = {0: '上', 1: '右', 2: '下', 3: '左'} self.reversed_action_dict = {'上': 0, '右':1, '下':2, '左': 3} # 分别是走上、下、左、右的概率。随机数命中某个数字如49,那就是向右。随机数只在0-100随机选数字。 self.four_action_probability = {'上': range(0, 25), '右': range(25, 50), '下': range(50, 75), '左': range(75, 100)} self.idx_change_dict = {'上': (-1, 0), '右': (0, 1), '下': (1, 0), '左': (0, -1)} # 左边这个是行的索引的改变,右边这个是列的索引的改变 self.episode = 100000 # 共采集TOTAL_ITERATION幕数据 # 初始化状态价值函数V # 用0代表迷宫的每一格,这个maze只是方便我们看迷宫,没有其他作用。 maze = np.zeros((self.total_rows, self.total_columns)) print(maze) def get_current_reward_and_next_state(self, current_state, action): ''' 根据当前的状态,以及行为,计算当前行为的奖励以及下一个状态 ''' row_idx, column_idx = current_state # 计算下下一步的state和reward next_row_idx = row_idx + self.idx_change_dict[action][0] next_column_idx = column_idx + self.idx_change_dict[action][1] # 先判断是否到了终点,如果是终点,不管执行什么操作 # 奖励都是0,并且都会回到终点 if (next_row_idx == 0 and next_column_idx == 0): return 0, (0, 0) if (next_row_idx == 3 and next_column_idx == 3): return 0, (3, 3) # 再判断是否在边缘,如果是的话,那就回到该位置。 if next_row_idx self.total_rows - 1 or next_column_idx self.total_columns - 1: return self.reward_each_step, (row_idx, column_idx) else: return self.reward_each_step, (next_row_idx, next_column_idx) def generate_initial_state(self, total_rows, total_columns): row_idx = random.randint(0, total_rows - 1) column_idx = random.randint(0, total_columns - 1) while (row_idx == 0 and column_idx == 0) or (row_idx == 3 and column_idx == 3): row_idx = random.randint(0, total_rows - 1) column_idx = random.randint(0, total_columns - 1) return (row_idx, column_idx) def generate_one_episode_data(self, init_state): one_episode_data = [] current_state = init_state while not ((current_state[0] == 0 and current_state[1] == 0) or (current_state[0] == 3 and current_state[1] == 3)): # 根据概率产生一个动作 rand_int = random.randint(0, 99) for each in self.four_action_probability.items(): if rand_int in each[1]: action = each[0] break # 根据要走的动作得到奖励以及获取下一个状态 reward, next_state = self.get_current_reward_and_next_state(current_state, action) # (当前状态,当前行为,当前行为的奖励) one_episode_data.append((current_state, self.reversed_action_dict[action], reward)) current_state = next_state # while循环出来的时候,最后一个terminal状态没加进去。 one_episode_data.append((current_state, None, None)) return one_episode_data def fire_calculation(self): # 计算“状态价值” # 创建一个字典保存出现的状态以及奖励 begin_time = time.time() episode_record_dict = {} final_state_reward_dict = {} final_state_count_dict = {} for episode in range(self.episode): # 生成每一幕的数据 all_episode_list = [] # 随机生成一个起始状态 init_state = self.generate_initial_state(self.total_rows, self.total_columns) # 生成一幕数据 current_generated_episode = self.generate_one_episode_data(init_state) episode_record_dict[episode] = current_generated_episode # 对这幕数据进行遍历,然后将出现过的状态进行统计 timeStep_state_reward_dict = {} # 记录每一个状态当前的总共的reward for timeStep, eachTuple in enumerate(current_generated_episode): # 先判断是不是到了终点,如果是的话就跳出循环 if timeStep == len(episode_record_dict[episode])-1: break # 将state和timeStep组合成字符串,方便作为dict的key timeStep_state_combination = str(timeStep) + str(eachTuple[0][0]) + str(eachTuple[0][1]) # 对state_action_reward_dict()里的所有的key都累加当前的reward。 for key in timeStep_state_reward_dict.keys(): timeStep_state_reward_dict[key] += eachTuple[2] # 检测当前这一幕该状态和动作组合是否出现过 if timeStep_state_combination not in timeStep_state_reward_dict.keys(): # 如果不存在在timeStep_state_reward_dict.keys()里,那就把它加进去。 # 其实每一个时间点都会被添加进去,因为这是“每次访问型蒙特卡罗策略估计”。 timeStep_state_reward_dict[timeStep_state_combination] = eachTuple[2] # 将该募最后统计到总的变量里。 for timeStep_state, reward in timeStep_state_reward_dict.items(): # 把timeStep剥离开,取出state state = timeStep_state[-2:] if state not in final_state_reward_dict.keys(): final_state_reward_dict[state] = reward # 将该状态-动作计数设为reward final_state_count_dict[state] = 1 # 将该状态-动作计数设为1 else: # 否则说明其他幕中出现过该状态,并且曾经统计到final_state_action_reward_dict和final_state_action_count_dict变量里面 # 直接累加就好了。 final_state_reward_dict[state] += reward final_state_count_dict[state] += 1 if episode % 100 == 0: print("第{}个episode已完成=====已花费{}分钟".format(episode, (time.time() - begin_time) / 60)) # 计算下最终的状态价值 # 由于是按概率采样,因此可能会导致某些动作-状态没有出现过,这个时候就需要一些方法去解决了。 # 一种方法是增加采样次数,这种方法相当于是暴力解决。 # 另一种方法可以参考sutton的《强化学习第二版》的98页的5.4内容 self.averaged_state_value_dict = {} for state, reward in final_state_reward_dict.items(): self.averaged_state_value_dict[state] = reward / final_state_count_dict[state] print(self.averaged_state_value_dict) def draw_value_picture(self): fig, ax = plt.subplots() ax.set_axis_off() tbl = Table(ax, bbox=[0, 0, 1, 1]) width = 1.0 / (self.total_columns + 1) height = 1.0 / (self.total_rows + 1) # 给表格的中间赋值,赋值为该状态的价值 for row_idx in range(self.total_rows): for column_idx in range(self.total_columns): if (row_idx == 0 and column_idx == 0) or (row_idx == 3 and column_idx == 3): value = 0 else: value = self.averaged_state_value_dict[str(row_idx)+str(column_idx)] tbl.add_cell(row_idx+1, column_idx+1, width, height, text=value, loc='center', facecolor='white') # 给表格行加上索引 for row_idx in range(self.total_rows): tbl.add_cell(row_idx+1, 0, width, height, text=row_idx, loc='right', edgecolor='none', facecolor='none') # 给表格列加上索引 for column_idx in range(self.total_columns): tbl.add_cell(0, column_idx+1, width, height/4, text=column_idx, loc='center', edgecolor='none', facecolor='none') ax.add_table(tbl) plt.show() obj = Every_Visit_Monte_Carlo_Policy_Evaluation() obj.fire_calculation() obj.draw_value_picture() 代码复现、详细讲解及我的Github地址

完整代码地址:https://github.com/haitaifantuan/reinforcement_leanring



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有